本篇文章将介绍Spark SQL中的DataFrame,关于DataFrame的介绍可以参考:
https://blog.csdn.net/lemonzhaotao/article/details/80211231
在本篇文章中,将介绍RDD转换为DataFrame的2种方式
官网之RDD转DF:
http://spark.apache.org/docs/latest/sql-programming-guide.html#interoperating-with-rdds
DataFrame 与 RDD 的交互
Spark SQL它支持两种不同的方式转换已经存在的RDD到DataFrame
方法一
第一种方式是使用反射的方式,用反射去推倒出来RDD里面的schema。这个方式简单,但是不建议使用,因为在工作当中,使用这种方式是有限制的。
对于以前的版本来说,case class最多支持22个字段如果超过了22个字段,我们就必须要自己开发一个类,实现product接口才行。因此这种方式虽然简单,但是不通用;因为生产中的字段是非常非常多的,是不可能只有20来个字段的。
示例:
1 | /** |
方法二
创建一个DataFrame,使用编程的方式 这个方式用的非常多。通过编程方式指定schema ,对于第一种方式的schema其实定义在了case class里面了。
官网解读:
当我们的case class不能提前定义(因为业务处理的过程当中,你的字段可能是在变化的),因此使用case class很难去提前定义。
使用该方式创建DF的三大步骤:
- Create an RDD of Rows from the original RDD;
- Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
- Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
示例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26/**
* convert rdd to dataframe 2
* @param spark
*/
private def runProgrammaticSchemaExample(spark:SparkSession): Unit ={
// 1.转成RDD
val rdd = spark.sparkContext.textFile("E:/大数据/data/people.txt")
// 2.定义schema,带有StructType的
// 定义schema信息
val schemaString = "name age"
// 对schema信息按空格进行分割
// 最终fileds里包含了2个StructField
val fields = schemaString.split(" ")
// 字段类型,字段名称判断是不是为空
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// 3.把我们的schema信息作用到RDD上
// 这个RDD里面包含了一些行
// 形成Row类型的RDD
val rowRDD = rdd.map(_.split(","))
.map(x => Row(x(0), x(1).trim))
// 通过SparkSession创建一个DataFrame
// 传进来一个rowRDD和schema,将schema作用到rowRDD上
val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF.show()
}
[扩展]生产上创建DataFrame的代码举例
在实际生产环境中,我们其实选择的是方式二这种进行创建DataFrame的,这里将展示部分代码:
Schema的定义
1 | object AccessConvertUtil { |
创建DataFrame
1 | object SparkStatCleanJob { |